package pb.wang.streaming.kafka

import java.util.{Date, UUID}

import org.apache.ignite.Ignition
import org.apache.ignite.configuration.IgniteConfiguration
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.JavaConversions._
import scala.collection.mutable

/**
  * Created by admin on 2016/3/29.
  */
case class Joined(uuid: String, time: Date, imp: String, clk: String)

case class Imp(uuid: String, time: Date, imp: String)

case class Clk(uuid: String, time: Date, clk: String)

object JoinIgniteCache {

  val (imp, clk) = {
    val cfg = new IgniteConfiguration()
    val spi = new TcpDiscoverySpi();
    val ipFinder = new TcpDiscoveryMulticastIpFinder();
    ipFinder.setAddresses(List("127.0.0.1:47500"));
    spi.setIpFinder(ipFinder);
    cfg.setDiscoverySpi(spi);
    val ignite = Ignition.start(cfg)
    (ignite.getOrCreateCache[String, Imp]("Imp"), ignite.getOrCreateCache[String, Clk]("Clk"))
  }
}

object IgniteJoiner {

  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("IgniteJoiner").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(3))
    //ssc.checkpoint("checkpoint2")
    val imp = mutable.Queue[RDD[Object]]()
    val clk = mutable.Queue[RDD[Object]]()

    val streams = ssc.queueStream(imp, false).union(ssc.queueStream(clk, false))
    streams.flatMap(obj => {
      obj match {
        case imp: Imp => {

          val clk = JoinIgniteCache.clk.get(imp.uuid)
          // println(imp,clk)

          if (clk != null) {
            JoinIgniteCache.clk.remove(clk.uuid)
            Some(Joined(imp.uuid, imp.time, imp.imp, clk.clk))
          }
          else {
            JoinIgniteCache.imp.put(imp.uuid, imp)
            None
          }
        }
        case clk: Clk => {
          val imp = JoinIgniteCache.imp.get(clk.uuid)
          // println(imp,clk)

          if (imp != null) {
            JoinIgniteCache.imp.remove(imp.uuid)
            Some(Joined(clk.uuid, clk.time, imp.imp, clk.clk))
          }
          else {
            JoinIgniteCache.clk.put(clk.uuid, clk)
            None
          }
        }
        case _ => None
      }
    }).foreachRDD(_.foreachPartition(_.foreach(println(_))))

    ssc.start()
    for (x <- 0 to 100000000) yield {
      val uuid = UUID.randomUUID().toString
      val timeStamp = new Date()
      imp += sc.makeRDD(Seq(Imp(uuid, timeStamp, s"imp-$x")))
      clk += sc.makeRDD(Seq(Clk(uuid, timeStamp, s"clk-$x")))

    }
    ssc.awaitTermination()
  }
}